Skip to content

Conversation

@EstelleDa
Copy link
Member

No description provided.

@EstelleDa EstelleDa linked an issue Oct 17, 2025 that may be closed by this pull request
Copy link
Collaborator

@bencap bencap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Estelle, and sorry for the delay in the review. I left some comments on an implementation that I think would scale better for future namespaces and simplifies some of the logic with renaming columns into their respective namespaces. Let me know what you think and if you wanted to chat about it at all!

The score set to get the variants from.
data_type : {'scores', 'counts'}
The type of data to get. Either 'scores' or 'counts'.
data_types : List[Literal["scores", "counts", "clinVar", "gnomAD"]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if namespaces would be a better name for this now. The other thing that's a little tricky is we're really working with two distinct types of namespaces: score / count data supplied by the investigator and different namespaces for variant data we cache in our database. While we fetch score / count columns via one method, we will fetch data for other namespaces by a different one.

Copy link
Member Author

@EstelleDa EstelleDa Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the namespaces in here. It's clearer. I also considered whether put all of them in one function cause it's quite complex and messy. Separating them makes it easy to distinguish whether the data is from ours or others. We can modify this part when we're ready to add the clinVar and gnomAD's data.

Comment on lines +440 to +466
custom_columns = {
"scores": "score_columns",
"counts": "count_columns",
}
custom_columns_set = [custom_columns[dt] for dt in data_types if dt in custom_columns]
type_to_column = {
"scores": "score_data",
"counts": "count_data"
}
type_columns = [type_to_column[dt] for dt in data_types if dt in type_to_column]
columns = ["accession", "hgvs_nt", "hgvs_splice", "hgvs_pro"]
if include_post_mapped_hgvs:
columns.append("post_mapped_hgvs_g")
columns.append("post_mapped_hgvs_p")

if include_custom_columns:
custom_columns = [str(x) for x in list(score_set.dataset_columns.get(custom_columns_set, []))]
columns += custom_columns
elif data_type == "scores":
for column in custom_columns_set:
dataset_columns = [str(x) for x in list(score_set.dataset_columns.get(column, []))]
if column == "score_columns":
for c in dataset_columns:
prefixed = "scores." + c
columns.append(prefixed)
elif column == "count_columns":
for c in dataset_columns:
prefixed = "counts." + c
columns.append(prefixed)
elif len(data_types) == 1 and data_types[0] == "scores":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, we might consider maintaining a dictionary of all of our namespaces. Each namespace keys a list of column names. The idea here is that if we combine the outer key and any column name from the contained list, we get a namespaced column. This will let us both easily generate a namespaced column name while also retaining efficient and easy access to the real internal column name. The other benefit is that we avoid key collisions right from the start of the CSV generation process, so we won't have to worry about identifier columns colliding between say, ClinVar and gnomAD namespaces.

In this suggestion, I also eliminated the dictionaries for the internal mappings of the column keys. I think that we are only ever going to have two of those, and doing each of them explicitly is more readable.

Note that on the last line, I think we want to add the score column whenever scores is in the data types list and we aren't including custom columns. Right now if you supplied [clinvar, scores] and include_custom_columns = False, you wouldn't get the score column as output which isn't what we want.

Suggested change
custom_columns = {
"scores": "score_columns",
"counts": "count_columns",
}
custom_columns_set = [custom_columns[dt] for dt in data_types if dt in custom_columns]
type_to_column = {
"scores": "score_data",
"counts": "count_data"
}
type_columns = [type_to_column[dt] for dt in data_types if dt in type_to_column]
columns = ["accession", "hgvs_nt", "hgvs_splice", "hgvs_pro"]
if include_post_mapped_hgvs:
columns.append("post_mapped_hgvs_g")
columns.append("post_mapped_hgvs_p")
if include_custom_columns:
custom_columns = [str(x) for x in list(score_set.dataset_columns.get(custom_columns_set, []))]
columns += custom_columns
elif data_type == "scores":
for column in custom_columns_set:
dataset_columns = [str(x) for x in list(score_set.dataset_columns.get(column, []))]
if column == "score_columns":
for c in dataset_columns:
prefixed = "scores." + c
columns.append(prefixed)
elif column == "count_columns":
for c in dataset_columns:
prefixed = "counts." + c
columns.append(prefixed)
elif len(data_types) == 1 and data_types[0] == "scores":
namespaced_score_set_columns: dict[str, list[str]] = {
"core": ["accession", "hgvs_nt", "hgvs_splice", "hgvs_pro"],
"mavedb": [],
}
if include_post_mapped_hgvs:
namespaced_score_set_columns["mavedb"].append("post_mapped_hgvs_g")
namespaced_score_set_columns["mavedb"].append("post_mapped_hgvs_p")
for namespace in data_types:
namespaced_score_set_columns[namespace] = []
if include_custom_columns:
if "scores" in namespaced_score_set_columns:
namespaced_score_set_columns["scores"] = [
col for col in [str(x) for x in list(score_set.dataset_columns.get("score_columns", []))]
]
if "counts" in namespaced_score_set_columns:
namespaced_score_set_columns["counts"] = [
col for col in [str(x) for x in list(score_set.dataset_columns.get("count_columns", []))]
]
elif "scores" in namespaced_score_set_columns:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EstelleDa One thing @sallybg suggested to me today was to put the hgvs strings into their own, more descriptive, namespace rather than mavedb. I'm tempted by something like hgvs but that might be confusing considering we have other columns that start with hgvs. So maybe mapped_hgvs is most precise. We can put other identifiers into their own namespaces, so I think having an hgvs specific one is best.

Comment on lines 600 to 639
row = {}
for column_key in columns:
if column_key == "hgvs_nt":
value = str(variant.hgvs_nt)
elif column_key == "hgvs_pro":
value = str(variant.hgvs_pro)
elif column_key == "hgvs_splice":
value = str(variant.hgvs_splice)
elif column_key == "accession":
value = str(variant.urn)
elif column_key == "post_mapped_hgvs_g":
hgvs_str = get_hgvs_from_post_mapped(mapping.post_mapped) if mapping and mapping.post_mapped else None
if hgvs_str is not None and is_hgvs_g(hgvs_str):
value = hgvs_str
else:
value = ""
elif column_key == "post_mapped_hgvs_p":
hgvs_str = get_hgvs_from_post_mapped(mapping.post_mapped) if mapping and mapping.post_mapped else None
if hgvs_str is not None and is_hgvs_p(hgvs_str):
value = hgvs_str
else:
value = ""
else:
parent = variant.data.get(dtype) if variant.data else None
value = str(parent.get(column_key)) if parent else na_rep
for dt in dtype:
parent = variant.data.get(dt) if variant.data else None
if column_key.startswith("scores."):
inner_key = column_key.replace("scores.", "")
elif column_key.startswith("counts."):
inner_key = column_key.replace("counts.", "")
else:
# fallback for non-prefixed columns
inner_key = column_key
if parent and inner_key in parent:
value = str(parent[inner_key])
break
if is_null(value):
value = na_rep
row[column_key] = value

return row
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest difference in this implementation is here, where we now stop falling back on custom keys in the else block and instead handle all keys in each namespace explicitly. If a column name is in a namespaced columns list and it isn't found by this function, it gets a value of None. I like this though because we are handling exports at the namespace level and we have to explicitly define the export behavior for each key.

This is also where we rename the columns so we can get their namespaces. Since we still have access to the raw column name, it's more readable to do that renaming and we don't need to do anything with string manipulation.

(i renamed the dtype parameter to namespaces.

Suggested change
row: dict[str, Any] = {}
# Handle each column key explicitly as part of its namespace.
for column_key in namespaces.get("core", []):
if column_key == "hgvs_nt":
value = str(variant.hgvs_nt)
elif column_key == "hgvs_pro":
value = str(variant.hgvs_pro)
elif column_key == "hgvs_splice":
value = str(variant.hgvs_splice)
elif column_key == "accession":
value = str(variant.urn)
if is_null(value):
value = na_rep
# export columns in the `core` namespace without a namespace
row[column_key] = value
for column_key in namespaces.get("mavedb", []):
if column_key == "post_mapped_hgvs_g":
hgvs_str = get_hgvs_from_post_mapped(mapping.post_mapped) if mapping and mapping.post_mapped else None
if hgvs_str is not None and is_hgvs_g(hgvs_str):
value = hgvs_str
else:
value = ""
elif column_key == "post_mapped_hgvs_p":
hgvs_str = get_hgvs_from_post_mapped(mapping.post_mapped) if mapping and mapping.post_mapped else None
if hgvs_str is not None and is_hgvs_p(hgvs_str):
value = hgvs_str
else:
value = ""
if is_null(value):
value = na_rep
row[f"mavedb.{column_key}"] = value
for column_key in namespaces.get("scores", []):
parent = variant.data.get("score_data") if variant.data else None
value = str(parent.get(column_key)) if parent else na_rep
row[f"scores.{column_key}"] = value
for column_key in namespaces.get("counts", []):
parent = variant.data.get("count_data") if variant.data else None
value = str(parent.get(column_key)) if parent else na_rep
row[f"counts.{column_key}"] = value
return row


rows_data = variants_to_csv_rows(variants, columns=columns, dtype=type_column, mappings=mappings) # type: ignore
rows_data = variants_to_csv_rows(variants, columns=columns, dtype=type_columns, mappings=mappings) # type: ignore

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this implementation is that by the time we get to this point we're already done. We've fetched and renamed all our export files. We just need to generate a list of columns for the drop_na_columns_from_csv_file_rows function.

    rows_columns = [f"{namespace}.{col}" for namespace, cols in namespaced_score_set_columns.items() for col in cols]

One thing that your solution did that this one doesn't do is remove the prefix from columns if only one namespace is being exported. For consistency with the new changes, I think I actually prefer to always export columns inside their namespace. This implementation also would require a few small changes to some of the other CSV export routers, but I think that would be trivial.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Namespaced variant CSV export

2 participants